#include "channel.h"
#include "channel_manager.h"
#include "log.h"
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <asio.hpp>

Channel::Channel(ChannelManager *pChannelMgr,
	asio::ip::tcp::socket socket, 
	asio::io_context *pIoContext, 
	ChannalType nChannalType)
	: m_Socket(std::move(socket)), m_timerHeartBeat(*pIoContext)
{
	m_pIoContext = pIoContext;
	m_pChannelMgr = pChannelMgr;
	m_nLastActiveTime = TimeNow();
	m_tCreateTime = std::time(0);
	m_nSendTimes = 0;
	m_nRecvTimes = 0;
	m_nSendBytes = 0;
	m_nRecvBytes = 0;
	m_nChannelID = -1;
	m_bSending = false;
	m_channalType = nChannalType;
}

Channel::~Channel()
{

}

void Channel::DoClose()
{
	m_pChannelMgr->DeleteService(shared_from_this());
	if (m_Socket.available())
	{
		m_Socket.shutdown(asio::ip::tcp::socket::shutdown_both);
		m_Socket.close();
	}
	
	if (m_channalType == ChannalType::TCPCLIENT)
	{
		if (m_pChannelMgr->m_pCliSpi)
		{
			Session_t session;
			session.nSessionID = m_nChannelID;
			m_pChannelMgr->m_pCliSpi->OnDisConnect(session);
		}
		//作为客户端时触发断线自动重连
		if (m_pChannelMgr->m_working)
		{
			LOGW("reconnect [%s:%d]",m_strRemoteIp.c_str(),m_uRemotePort);
			m_pChannelMgr->InternalConnect(m_strRemoteIp, m_uRemotePort);
		}
	}
	else 
	{
		if (m_pChannelMgr->m_pSvrSpi)
		{
			Session_t session;
			session.nSessionID = m_nChannelID;
			m_pChannelMgr->m_pSvrSpi->OnDisConnect(session);
		}
	}
}

void Channel::StartRead()
{
	m_uRemotePort = m_Socket.remote_endpoint().port();
	m_strRemoteIp = m_Socket.remote_endpoint().address().to_string();
	DoReadHead();
// 	if (m_channalType == ChannalType::TCPCLIENT)
// 	{
// 		PumpHeartBeat();
// 	}
}

void Channel::DoReadHead()
{
	auto self(shared_from_this());
	asio::async_read(m_Socket, asio::buffer(&m_head, sizeof(NetHead)),
		[this, self](asio::error_code ec, std::size_t length)
	{
		if (ec)
		{
			LOGW("socket error[%d:%s] from [%s:%d]", ec.value(),ec.message().c_str(),m_strRemoteIp.c_str(),m_uRemotePort);
			DoClose();
		}
		else
		{
			DoReadBody();
		}
	});
}


void Channel::DoReadBody()
{
	auto self(shared_from_this());
	//int64_t nHeadSize = m_head.nHeadSize;
	int64_t nBodySize = m_head.nBodySize;
	if (nBodySize > 0)
	{
		m_body.resize(nBodySize);
		asio::async_read(m_Socket, asio::buffer(const_cast<char*>(m_body.data()), nBodySize),
			[this, self](asio::error_code ec, std::size_t length)
		{
			if (ec)
			{
				LOGW("error:%d:%s", ec.value(), ec.message().c_str());
				DoClose();
			}
			else
			{
				OnReceive();
			}
		});
	}
	DoReadHead();
}

void Channel::OnReceive()
{
	
	switch (m_head.nMsgType)
	{
	case ShakeHand://握手
	{
		if (m_channalType == ChannalType::TCPCLIENT)
		{
			OnRecvShakeHandRsp();
		}
		else if (m_channalType == ChannalType::TCPSERVER)
		{
			OnRecvShakeHandReq();
		}
	}
	break;
	case MsgHeartBeat://心跳报文
	{
		if (m_channalType == ChannalType::TCPCLIENT)
		{
			m_nLastActiveTime = TimeNow();
		}
		else if (m_channalType == ChannalType::TCPSERVER)
		{
			LOGD("recv heartbeat msg");
			HeartBeat();
		}
	}
	break;
	case MsgContext://通讯报文
	{
		if (m_channalType == ChannalType::TCPCLIENT)
		{
			m_nLastActiveTime = TimeNow();//更新活动时间
			if (m_pChannelMgr->m_pCliSpi)
			{
				Session_t session;
				session.nSessionID = m_nChannelID;
				m_pChannelMgr->m_pCliSpi->OnMessage(session, std::move(m_body));
			}
			else
			{
				LOGW("cannot callback,because spi is null");
			}
		}
		else if (m_channalType == ChannalType::TCPSERVER)
		{
			if (m_pChannelMgr->m_pSvrSpi)
			{
				Session_t session;
				session.nSessionID = m_nChannelID;
				m_pChannelMgr->m_pSvrSpi->OnMessage(session, std::move(m_body));
			}
			else
			{
				LOGW("cannot callback,because spi is null");
			}
		}
		else
		{
			LOGE("channel type error");
		}
	}
	break;
	default:
		LOGE("msg type error");
		break;
	}
}

//目前作为客户端已支持断线重连，暂时不加入心跳机制
void Channel::PumpHeartBeat()
{
	LOGD("register heartbeat");
	auto self(shared_from_this());
	m_timerHeartBeat.expires_from_now(std::chrono::seconds(3));
	m_timerHeartBeat.async_wait([self, this](const asio::error_code& ec) 
	{
		if (ec)
		{
			return;
		}
		std::time_t tNow = TimeNow();
		int32_t diff = tNow - m_nLastActiveTime;
		LOGD("now=%d,last=%d,diff=%d", tNow, m_nLastActiveTime, diff);
		if (diff > 5)
		{
			HeartBeat();
		}
		PumpHeartBeat();
	});
}

void Channel::HeartBeat()
{
	LOGD("send heartbeat msg");
	const int nPackageLen = sizeof(NetHead) + sizeof(HeartBeatReq);
	char data[nPackageLen] = { 0 };
	auto pHead = (NetHead*)data;
	pHead->nHeadSize = sizeof(NetHead);
	pHead->nBodySize = sizeof(ShakeHandReq);
	pHead->nMsgType = MsgHeartBeat;

	auto pReq = (HeartBeatReq*)(data + pHead->nHeadSize);
	strcpy(pReq->data, "alive");
	write(data, nPackageLen);
	LOGD("send heartbeat msg");
}

void Channel::DoReqShakeHand()
{
	const int nPackageLen = sizeof(NetHead) + sizeof(ShakeHandReq);
	char data[nPackageLen] = { 0 };
	auto pHead = (NetHead*)data;
	pHead->nHeadSize = sizeof(NetHead);
	pHead->nBodySize = sizeof(ShakeHandReq);
	pHead->nMsgType = ShakeHand;
	//auto pReq = (ShakeHandReq*)(data + pHead->nHeadSize);
	write(data, nPackageLen);
	StartRead();
}

void Channel::OnRecvShakeHandReq()
{
	const int nPackageLen = sizeof(NetHead) + sizeof(ShakeHandRsp);
	char buf[nPackageLen] = { 0 };
	auto pRspHead = (NetHead*)(buf);
	pRspHead->nHeadSize = sizeof(NetHead);
	pRspHead->nBodySize = sizeof(ShakeHandRsp);
	pRspHead->nMsgType = m_head.nMsgType;
	//auto pRsp = (ShakeHandRsp*)(buf + pRspHead->nHeadSize);
	int iChannelID = m_pChannelMgr->AddService(shared_from_this());
	write(buf, nPackageLen);
	if (m_pChannelMgr->m_pSvrSpi)
	{
		Session_t session;
		session.nSessionID = m_nChannelID;
		m_pChannelMgr->m_pSvrSpi->OnConnected(session);
	}
}

void Channel::OnRecvShakeHandRsp()
{
	auto pRsp = (ShakeHandRsp*)(m_body.data());
	if (pRsp->nResult == 0) 
	{
		LOGE("ShakeHand success!");
	}
	int iChannelID = m_pChannelMgr->AddService(shared_from_this());
	if (m_pChannelMgr->m_pCliSpi)
	{
		Session_t session;
		session.nSessionID = m_nChannelID;
		m_pChannelMgr->m_pCliSpi->OnConnected(session);
	}
}

void Channel::write(char *data, std::size_t length) 
{
	auto self(shared_from_this());
	char *pData = new char[length];
	memcpy(pData, data, length);

	m_pIoContext->post([self, pData, length]() 
	{
		Node node = { pData,length };
		self->m_dataQueue.push(node);
		self->DoWrite();
	});
}

void Channel::DoWrite()
{
	if ( m_bSending || m_dataQueue.empty()) 
	{
		return;
	}

	auto self(shared_from_this());

	m_bSending = true;
	auto d = m_dataQueue.front();
	m_dataQueue.pop();
	auto pData = d.pData;
	asio::async_write(m_Socket, asio::buffer(pData, d.nLen),
		[this, self, pData](asio::error_code ec, std::size_t length)
	{
		delete[]pData;
		m_bSending = false;
		if (ec)
		{
			if (m_Socket.available())
			{
				DoClose();
			}
		}
		else 
		{
			m_nSendBytes += length;
			m_nSendTimes++;
			DoWrite();
		}

	});
}

void Channel::SendMsg(const char* pMsg, std::size_t nDataLen)
{
	auto self(shared_from_this());
	
	int64_t nPackageLen = sizeof(NetHead) + nDataLen;
	char *pData = new char[nPackageLen];
	memset(pData, 0, nPackageLen);

	auto pHead = (NetHead*)pData;
	pHead->nMagic = 0;
	pHead->nHeadSize = sizeof(NetHead);
	pHead->nBodySize = nDataLen;
	pHead->nMsgType = MsgContext;

	memcpy(pData + pHead->nHeadSize, pMsg, nDataLen);

	m_pIoContext->post([self, pData, nPackageLen]() 
	{
		Node de = { pData,nPackageLen };
		self->m_dataQueue.push(de);
		self->DoWrite();
	});
}

int64_t Channel::TimeNow()
{
	std::chrono::system_clock clock;
	return std::chrono::duration_cast<std::chrono::seconds>(clock.now().time_since_epoch()).count();
}